feat: curio: storage index gc task (#11884)
* curio storage path gc: lay out the structure * curio gc: Implement storage metadata gc * move bored singleton task impl to harmonytask * curio: run storage gc task on storage node * make gen
This commit is contained in:
parent
fd7f1a95e2
commit
c785e59371
@ -18,6 +18,7 @@ import (
|
|||||||
curio "github.com/filecoin-project/lotus/curiosrc"
|
curio "github.com/filecoin-project/lotus/curiosrc"
|
||||||
"github.com/filecoin-project/lotus/curiosrc/chainsched"
|
"github.com/filecoin-project/lotus/curiosrc/chainsched"
|
||||||
"github.com/filecoin-project/lotus/curiosrc/ffi"
|
"github.com/filecoin-project/lotus/curiosrc/ffi"
|
||||||
|
"github.com/filecoin-project/lotus/curiosrc/gc"
|
||||||
"github.com/filecoin-project/lotus/curiosrc/message"
|
"github.com/filecoin-project/lotus/curiosrc/message"
|
||||||
"github.com/filecoin-project/lotus/curiosrc/piece"
|
"github.com/filecoin-project/lotus/curiosrc/piece"
|
||||||
"github.com/filecoin-project/lotus/curiosrc/seal"
|
"github.com/filecoin-project/lotus/curiosrc/seal"
|
||||||
@ -136,6 +137,12 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if hasAnySealingTask {
|
||||||
|
// Sealing nodes maintain storage index when bored
|
||||||
|
storageEndpointGcTask := gc.NewStorageEndpointGC(si, stor, db)
|
||||||
|
activeTasks = append(activeTasks, storageEndpointGcTask)
|
||||||
|
}
|
||||||
|
|
||||||
if needProofParams {
|
if needProofParams {
|
||||||
for spt := range dependencies.ProofTypes {
|
for spt := range dependencies.ProofTypes {
|
||||||
if err := modules.GetParams(true)(spt); err != nil {
|
if err := modules.GetParams(true)(spt); err != nil {
|
||||||
|
276
curiosrc/gc/storage_endpoint_gc.go
Normal file
276
curiosrc/gc/storage_endpoint_gc.go
Normal file
@ -0,0 +1,276 @@
|
|||||||
|
package gc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"github.com/samber/lo"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||||
|
"github.com/filecoin-project/lotus/lib/result"
|
||||||
|
"github.com/filecoin-project/lotus/storage/paths"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("curiogc")
|
||||||
|
|
||||||
|
const StorageEndpointGCInterval = 2 * time.Minute // todo bump post testing
|
||||||
|
const StorageEndpointDeadTime = 15 * time.Minute
|
||||||
|
const MaxParallelEndpointChecks = 32
|
||||||
|
|
||||||
|
type StorageEndpointGC struct {
|
||||||
|
si *paths.DBIndex
|
||||||
|
remote *paths.Remote
|
||||||
|
db *harmonydb.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStorageEndpointGC(si *paths.DBIndex, remote *paths.Remote, db *harmonydb.DB) *StorageEndpointGC {
|
||||||
|
return &StorageEndpointGC{
|
||||||
|
si: si,
|
||||||
|
remote: remote,
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||||
|
/*
|
||||||
|
1. Get all storage paths + urls (endpoints)
|
||||||
|
2. Ping each url, record results
|
||||||
|
3. Update sector_path_url_liveness with success/failure
|
||||||
|
4.1 If a URL was consistently down for StorageEndpointDeadTime, remove it from the storage_path table
|
||||||
|
4.2 Remove storage paths with no URLs remaining
|
||||||
|
4.2.1 in the same transaction remove sector refs to the dead path
|
||||||
|
*/
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
var pathRefs []struct {
|
||||||
|
StorageID storiface.ID `db:"storage_id"`
|
||||||
|
Urls string `db:"urls"`
|
||||||
|
LastHeartbeat *time.Time `db:"last_heartbeat"`
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.db.Select(ctx, &pathRefs, `SELECT storage_id, urls, last_heartbeat FROM storage_path`)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("getting path metadata: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type pingResult struct {
|
||||||
|
storageID storiface.ID
|
||||||
|
url string
|
||||||
|
|
||||||
|
res result.Result[fsutil.FsStat]
|
||||||
|
}
|
||||||
|
|
||||||
|
var pingResults []pingResult
|
||||||
|
var resultLk sync.Mutex
|
||||||
|
var resultThrottle = make(chan struct{}, MaxParallelEndpointChecks)
|
||||||
|
|
||||||
|
for _, pathRef := range pathRefs {
|
||||||
|
pathRef := pathRef
|
||||||
|
urls := strings.Split(pathRef.Urls, paths.URLSeparator)
|
||||||
|
|
||||||
|
for _, url := range urls {
|
||||||
|
url := url
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resultThrottle <- struct{}{}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
<-resultThrottle
|
||||||
|
}()
|
||||||
|
|
||||||
|
st, err := s.remote.StatUrl(ctx, url, pathRef.StorageID)
|
||||||
|
|
||||||
|
res := pingResult{
|
||||||
|
storageID: pathRef.StorageID,
|
||||||
|
url: url,
|
||||||
|
res: result.Wrap(st, err),
|
||||||
|
}
|
||||||
|
|
||||||
|
resultLk.Lock()
|
||||||
|
pingResults = append(pingResults, res)
|
||||||
|
resultLk.Unlock()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all pings to finish
|
||||||
|
for i := 0; i < MaxParallelEndpointChecks; i++ {
|
||||||
|
select {
|
||||||
|
case resultThrottle <- struct{}{}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the liveness table
|
||||||
|
|
||||||
|
/*
|
||||||
|
create table sector_path_url_liveness (
|
||||||
|
storage_id text,
|
||||||
|
url text,
|
||||||
|
|
||||||
|
last_checked timestamp not null,
|
||||||
|
last_live timestamp,
|
||||||
|
last_dead timestamp,
|
||||||
|
last_dead_reason text,
|
||||||
|
|
||||||
|
primary key (storage_id, url),
|
||||||
|
|
||||||
|
foreign key (storage_id) references storage_path (storage_id) on delete cascade
|
||||||
|
)
|
||||||
|
*/
|
||||||
|
|
||||||
|
currentTime := time.Now().UTC()
|
||||||
|
|
||||||
|
committed, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
|
||||||
|
for _, pingResult := range pingResults {
|
||||||
|
var lastLive, lastDead, lastDeadReason interface{}
|
||||||
|
if pingResult.res.Error == nil {
|
||||||
|
lastLive = currentTime.UTC()
|
||||||
|
lastDead = nil
|
||||||
|
lastDeadReason = nil
|
||||||
|
} else {
|
||||||
|
lastLive = nil
|
||||||
|
lastDead = currentTime.UTC()
|
||||||
|
lastDeadReason = pingResult.res.Error.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := tx.Exec(`
|
||||||
|
INSERT INTO sector_path_url_liveness (storage_id, url, last_checked, last_live, last_dead, last_dead_reason)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
|
ON CONFLICT (storage_id, url) DO UPDATE
|
||||||
|
SET last_checked = EXCLUDED.last_checked,
|
||||||
|
last_live = COALESCE(EXCLUDED.last_live, sector_path_url_liveness.last_live),
|
||||||
|
last_dead = COALESCE(EXCLUDED.last_dead, sector_path_url_liveness.last_dead),
|
||||||
|
last_dead_reason = COALESCE(EXCLUDED.last_dead_reason, sector_path_url_liveness.last_dead_reason)
|
||||||
|
`, pingResult.storageID, pingResult.url, currentTime, lastLive, lastDead, lastDeadReason)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("updating liveness data: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}, harmonydb.OptionRetry())
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("sector_path_url_liveness update: %w", err)
|
||||||
|
}
|
||||||
|
if !committed {
|
||||||
|
return false, xerrors.Errorf("sector_path_url_liveness update: transaction didn't commit")
|
||||||
|
}
|
||||||
|
|
||||||
|
///////
|
||||||
|
// Now we do the actual database cleanup
|
||||||
|
if !stillOwned() {
|
||||||
|
return false, xerrors.Errorf("task no longer owned")
|
||||||
|
}
|
||||||
|
|
||||||
|
committed, err = s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
|
||||||
|
// Identify URLs that are consistently down
|
||||||
|
var deadURLs []struct {
|
||||||
|
StorageID storiface.ID
|
||||||
|
URL string
|
||||||
|
}
|
||||||
|
err = tx.Select(&deadURLs, `
|
||||||
|
SELECT storage_id, url FROM sector_path_url_liveness
|
||||||
|
WHERE last_dead > last_live AND last_dead < $1
|
||||||
|
`, currentTime.Add(-StorageEndpointDeadTime).UTC())
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("selecting dead URLs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugw("dead urls", "dead_urls", deadURLs)
|
||||||
|
|
||||||
|
// Remove dead URLs from storage_path entries and handle path cleanup
|
||||||
|
for _, du := range deadURLs {
|
||||||
|
// Fetch the current URLs for the storage path
|
||||||
|
var currentPath struct {
|
||||||
|
URLs string
|
||||||
|
}
|
||||||
|
err = tx.Get(¤tPath, "SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("fetching storage paths: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter out the dead URL using lo.Reject and prepare the updated list
|
||||||
|
urls := strings.Split(currentPath.URLs, paths.URLSeparator)
|
||||||
|
urls = lo.Reject(urls, func(u string, _ int) bool {
|
||||||
|
return u == du.URL
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Debugw("filtered urls", "urls", urls, "dead_url", du.URL, "storage_id", du.StorageID)
|
||||||
|
|
||||||
|
if os.Getenv("CURIO_STORAGE_META_GC_DRYRUN") != "no" { // todo drop this after testing
|
||||||
|
log.Debugw("dryrun: not updating storage path", "storage_id", du.StorageID, "urls", urls, "dead_url", du.URL, "current_urls", currentPath.URLs, "dead_urls", deadURLs)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(urls) == 0 {
|
||||||
|
// If no URLs left, remove the storage path entirely
|
||||||
|
_, err = tx.Exec("DELETE FROM storage_path WHERE storage_id = $1", du.StorageID)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("deleting storage path: %w", err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec("DELETE FROM sector_location WHERE storage_id = $1", du.StorageID)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("deleting sector locations: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Update the storage path with the filtered URLs
|
||||||
|
newURLs := strings.Join(urls, paths.URLSeparator)
|
||||||
|
_, err = tx.Exec("UPDATE storage_path SET urls = $1 WHERE storage_id = $2", newURLs, du.StorageID)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("updating storage path urls: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}, harmonydb.OptionRetry())
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("removing dead URLs and cleaning storage paths: %w", err)
|
||||||
|
}
|
||||||
|
if !committed {
|
||||||
|
return false, xerrors.Errorf("transaction for removing dead URLs and cleaning paths did not commit")
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StorageEndpointGC) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
||||||
|
id := ids[0]
|
||||||
|
return &id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StorageEndpointGC) TypeDetails() harmonytask.TaskTypeDetails {
|
||||||
|
return harmonytask.TaskTypeDetails{
|
||||||
|
Max: 1,
|
||||||
|
Name: "StorageMetaGC",
|
||||||
|
Cost: resources.Resources{
|
||||||
|
Cpu: 1,
|
||||||
|
Ram: 64 << 20,
|
||||||
|
Gpu: 0,
|
||||||
|
},
|
||||||
|
IAmBored: harmonytask.SingletonTaskAdder(StorageEndpointGCInterval, s),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *StorageEndpointGC) Adder(taskFunc harmonytask.AddTaskFunc) {
|
||||||
|
// lazy endpoint, added when bored
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ harmonytask.TaskInterface = &StorageEndpointGC{}
|
@ -0,0 +1,8 @@
|
|||||||
|
create table harmony_task_singletons (
|
||||||
|
task_name varchar(255) not null,
|
||||||
|
task_id bigint,
|
||||||
|
last_run_time timestamp,
|
||||||
|
|
||||||
|
primary key (task_name),
|
||||||
|
foreign key (task_id) references harmony_task (id) on delete set null
|
||||||
|
);
|
13
lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql
Normal file
13
lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
create table sector_path_url_liveness (
|
||||||
|
storage_id text,
|
||||||
|
url text,
|
||||||
|
|
||||||
|
last_checked timestamp not null,
|
||||||
|
last_live timestamp,
|
||||||
|
last_dead timestamp,
|
||||||
|
last_dead_reason text,
|
||||||
|
|
||||||
|
primary key (storage_id, url),
|
||||||
|
|
||||||
|
foreign key (storage_id) references storage_path (storage_id) on delete cascade
|
||||||
|
)
|
@ -236,6 +236,10 @@ func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) e
|
|||||||
return pgxscan.Select(t.ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...)
|
return pgxscan.Select(t.ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Tx) Get(s any, sql rawStringOnly, arguments ...any) error {
|
||||||
|
return pgxscan.Get(t.ctx, t.Tx, s, string(sql), arguments...)
|
||||||
|
}
|
||||||
|
|
||||||
func IsErrUniqueContraint(err error) bool {
|
func IsErrUniqueContraint(err error) bool {
|
||||||
var e2 *pgconn.PgError
|
var e2 *pgconn.PgError
|
||||||
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
|
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
|
||||||
|
52
lib/harmony/harmonytask/singleton_task.go
Normal file
52
lib/harmony/harmonytask/singleton_task.go
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package harmonytask
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
|
"github.com/filecoin-project/lotus/lib/passcall"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SingletonTaskAdder(minInterval time.Duration, task TaskInterface) func(AddTaskFunc) error {
|
||||||
|
return passcall.Every(minInterval, func(add AddTaskFunc) error {
|
||||||
|
taskName := task.TypeDetails().Name
|
||||||
|
|
||||||
|
add(func(taskID TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
|
||||||
|
var existingTaskID *int64
|
||||||
|
var lastRunTime time.Time
|
||||||
|
|
||||||
|
// Query to check the existing task entry
|
||||||
|
err = tx.QueryRow(`SELECT task_id, last_run_time FROM harmony_task_singletons WHERE task_name = $1`, taskName).Scan(&existingTaskID, &lastRunTime)
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, pgx.ErrNoRows) {
|
||||||
|
return false, err // return error if query failed and it's not because of missing row
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
// Determine if the task should run based on the absence of a record or outdated last_run_time
|
||||||
|
shouldRun := err == pgx.ErrNoRows || (existingTaskID == nil && lastRunTime.Add(minInterval).Before(now))
|
||||||
|
if !shouldRun {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Conditionally insert or update the task entry
|
||||||
|
n, err := tx.Exec(`
|
||||||
|
INSERT INTO harmony_task_singletons (task_name, task_id, last_run_time)
|
||||||
|
VALUES ($1, $2, $3)
|
||||||
|
ON CONFLICT (task_name) DO UPDATE
|
||||||
|
SET task_id = COALESCE(harmony_task_singletons.task_id, $2),
|
||||||
|
last_run_time = $3
|
||||||
|
WHERE harmony_task_singletons.task_id IS NULL
|
||||||
|
`, taskName, taskID, now)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return n > 0, nil
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
28
lib/passcall/every.go
Normal file
28
lib/passcall/every.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package passcall
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Every is a helper function that will call the provided callback
|
||||||
|
// function at most once every `passEvery` duration. If the function is called
|
||||||
|
// more frequently than that, it will return nil and not call the callback.
|
||||||
|
func Every[P, R any](passInterval time.Duration, cb func(P) R) func(P) R {
|
||||||
|
var lastCall time.Time
|
||||||
|
var lk sync.Mutex
|
||||||
|
|
||||||
|
return func(param P) R {
|
||||||
|
lk.Lock()
|
||||||
|
defer lk.Unlock()
|
||||||
|
|
||||||
|
if time.Since(lastCall) < passInterval {
|
||||||
|
return *new(R)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
lastCall = time.Now()
|
||||||
|
}()
|
||||||
|
return cb(param)
|
||||||
|
}
|
||||||
|
}
|
@ -416,49 +416,55 @@ func (r *Remote) FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, er
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, urlStr := range si.URLs {
|
for _, urlStr := range si.URLs {
|
||||||
rl, err := url.Parse(urlStr)
|
out, err := r.StatUrl(ctx, urlStr, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("failed to parse URL", "url", urlStr, "error", err)
|
log.Warnw("stat url failed", "url", urlStr, "error", err)
|
||||||
continue // Try the next URL
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
rl.Path = gopath.Join(rl.Path, "stat", string(id))
|
return out, nil
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", rl.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnw("creating request failed", "url", rl.String(), "error", err)
|
|
||||||
continue // Try the next URL
|
|
||||||
}
|
|
||||||
req.Header = r.auth
|
|
||||||
req = req.WithContext(ctx)
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnw("request failed", "url", rl.String(), "error", err)
|
|
||||||
continue // Try the next URL
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode == 200 {
|
|
||||||
var out fsutil.FsStat
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
log.Warnw("decoding response failed", "url", rl.String(), "error", err)
|
|
||||||
continue // Try the next URL
|
|
||||||
}
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
return out, nil // Successfully decoded, return the result
|
|
||||||
}
|
|
||||||
|
|
||||||
// non-200 status code
|
|
||||||
b, _ := io.ReadAll(resp.Body) // Best-effort read the body for logging
|
|
||||||
log.Warnw("request to endpoint failed", "url", rl.String(), "statusCode", resp.StatusCode, "response", string(b))
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
// Continue to try the next URL, don't return here as we want to try all URLs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return fsutil.FsStat{}, xerrors.Errorf("all endpoints failed for remote storage %s", id)
|
return fsutil.FsStat{}, xerrors.Errorf("all endpoints failed for remote storage %s", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Remote) StatUrl(ctx context.Context, urlStr string, id storiface.ID) (fsutil.FsStat, error) {
|
||||||
|
rl, err := url.Parse(urlStr)
|
||||||
|
if err != nil {
|
||||||
|
return fsutil.FsStat{}, xerrors.Errorf("parsing URL: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rl.Path = gopath.Join(rl.Path, "stat", string(id))
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", rl.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return fsutil.FsStat{}, xerrors.Errorf("creating request failed: %w", err)
|
||||||
|
}
|
||||||
|
req.Header = r.auth
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fsutil.FsStat{}, xerrors.Errorf("do request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode == 200 {
|
||||||
|
var out fsutil.FsStat
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
return fsutil.FsStat{}, xerrors.Errorf("decoding response failed: %w", err)
|
||||||
|
}
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
return out, nil // Successfully decoded, return the result
|
||||||
|
}
|
||||||
|
|
||||||
|
// non-200 status code
|
||||||
|
b, _ := io.ReadAll(resp.Body) // Best-effort read the body for logging
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
|
||||||
|
return fsutil.FsStat{}, xerrors.Errorf("endpoint failed %s: %d %s", rl.String(), resp.StatusCode, string(b))
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
|
func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
|
||||||
if len(r.limit) >= cap(r.limit) {
|
if len(r.limit) >= cap(r.limit) {
|
||||||
log.Infof("Throttling remote read, %d already running", len(r.limit))
|
log.Infof("Throttling remote read, %d already running", len(r.limit))
|
||||||
|
Loading…
Reference in New Issue
Block a user