diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 3b262efa8..71923018d 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -18,6 +18,7 @@ import ( curio "github.com/filecoin-project/lotus/curiosrc" "github.com/filecoin-project/lotus/curiosrc/chainsched" "github.com/filecoin-project/lotus/curiosrc/ffi" + "github.com/filecoin-project/lotus/curiosrc/gc" "github.com/filecoin-project/lotus/curiosrc/message" "github.com/filecoin-project/lotus/curiosrc/piece" "github.com/filecoin-project/lotus/curiosrc/seal" @@ -136,6 +137,12 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task } } + if hasAnySealingTask { + // Sealing nodes maintain storage index when bored + storageEndpointGcTask := gc.NewStorageEndpointGC(si, stor, db) + activeTasks = append(activeTasks, storageEndpointGcTask) + } + if needProofParams { for spt := range dependencies.ProofTypes { if err := modules.GetParams(true)(spt); err != nil { diff --git a/curiosrc/gc/storage_endpoint_gc.go b/curiosrc/gc/storage_endpoint_gc.go new file mode 100644 index 000000000..8c6e6a0bf --- /dev/null +++ b/curiosrc/gc/storage_endpoint_gc.go @@ -0,0 +1,276 @@ +package gc + +import ( + "context" + "os" + "strings" + "sync" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/samber/lo" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/result" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/fsutil" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +var log = logging.Logger("curiogc") + +const StorageEndpointGCInterval = 2 * time.Minute // todo bump post testing +const StorageEndpointDeadTime = 15 * time.Minute +const MaxParallelEndpointChecks = 32 + +type StorageEndpointGC struct { + si *paths.DBIndex + remote *paths.Remote + db *harmonydb.DB +} + +func NewStorageEndpointGC(si *paths.DBIndex, remote *paths.Remote, db *harmonydb.DB) *StorageEndpointGC { + return &StorageEndpointGC{ + si: si, + remote: remote, + db: db, + } +} + +func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + /* + 1. Get all storage paths + urls (endpoints) + 2. Ping each url, record results + 3. Update sector_path_url_liveness with success/failure + 4.1 If a URL was consistently down for StorageEndpointDeadTime, remove it from the storage_path table + 4.2 Remove storage paths with no URLs remaining + 4.2.1 in the same transaction remove sector refs to the dead path + */ + + ctx := context.Background() + + var pathRefs []struct { + StorageID storiface.ID `db:"storage_id"` + Urls string `db:"urls"` + LastHeartbeat *time.Time `db:"last_heartbeat"` + } + + err = s.db.Select(ctx, &pathRefs, `SELECT storage_id, urls, last_heartbeat FROM storage_path`) + if err != nil { + return false, xerrors.Errorf("getting path metadata: %w", err) + } + + type pingResult struct { + storageID storiface.ID + url string + + res result.Result[fsutil.FsStat] + } + + var pingResults []pingResult + var resultLk sync.Mutex + var resultThrottle = make(chan struct{}, MaxParallelEndpointChecks) + + for _, pathRef := range pathRefs { + pathRef := pathRef + urls := strings.Split(pathRef.Urls, paths.URLSeparator) + + for _, url := range urls { + url := url + + select { + case resultThrottle <- struct{}{}: + case <-ctx.Done(): + return false, ctx.Err() + } + + go func() { + defer func() { + <-resultThrottle + }() + + st, err := s.remote.StatUrl(ctx, url, pathRef.StorageID) + + res := pingResult{ + storageID: pathRef.StorageID, + url: url, + res: result.Wrap(st, err), + } + + resultLk.Lock() + pingResults = append(pingResults, res) + resultLk.Unlock() + }() + } + } + + // Wait for all pings to finish + for i := 0; i < MaxParallelEndpointChecks; i++ { + select { + case resultThrottle <- struct{}{}: + case <-ctx.Done(): + return false, ctx.Err() + } + } + + // Update the liveness table + + /* + create table sector_path_url_liveness ( + storage_id text, + url text, + + last_checked timestamp not null, + last_live timestamp, + last_dead timestamp, + last_dead_reason text, + + primary key (storage_id, url), + + foreign key (storage_id) references storage_path (storage_id) on delete cascade + ) + */ + + currentTime := time.Now().UTC() + + committed, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + for _, pingResult := range pingResults { + var lastLive, lastDead, lastDeadReason interface{} + if pingResult.res.Error == nil { + lastLive = currentTime.UTC() + lastDead = nil + lastDeadReason = nil + } else { + lastLive = nil + lastDead = currentTime.UTC() + lastDeadReason = pingResult.res.Error.Error() + } + + _, err := tx.Exec(` + INSERT INTO sector_path_url_liveness (storage_id, url, last_checked, last_live, last_dead, last_dead_reason) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (storage_id, url) DO UPDATE + SET last_checked = EXCLUDED.last_checked, + last_live = COALESCE(EXCLUDED.last_live, sector_path_url_liveness.last_live), + last_dead = COALESCE(EXCLUDED.last_dead, sector_path_url_liveness.last_dead), + last_dead_reason = COALESCE(EXCLUDED.last_dead_reason, sector_path_url_liveness.last_dead_reason) + `, pingResult.storageID, pingResult.url, currentTime, lastLive, lastDead, lastDeadReason) + if err != nil { + return false, xerrors.Errorf("updating liveness data: %w", err) + } + } + + return true, nil + }, harmonydb.OptionRetry()) + if err != nil { + return false, xerrors.Errorf("sector_path_url_liveness update: %w", err) + } + if !committed { + return false, xerrors.Errorf("sector_path_url_liveness update: transaction didn't commit") + } + + /////// + // Now we do the actual database cleanup + if !stillOwned() { + return false, xerrors.Errorf("task no longer owned") + } + + committed, err = s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + // Identify URLs that are consistently down + var deadURLs []struct { + StorageID storiface.ID + URL string + } + err = tx.Select(&deadURLs, ` + SELECT storage_id, url FROM sector_path_url_liveness + WHERE last_dead > last_live AND last_dead < $1 + `, currentTime.Add(-StorageEndpointDeadTime).UTC()) + if err != nil { + return false, xerrors.Errorf("selecting dead URLs: %w", err) + } + + log.Debugw("dead urls", "dead_urls", deadURLs) + + // Remove dead URLs from storage_path entries and handle path cleanup + for _, du := range deadURLs { + // Fetch the current URLs for the storage path + var currentPath struct { + URLs string + } + err = tx.Get(¤tPath, "SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID) + if err != nil { + return false, xerrors.Errorf("fetching storage paths: %w", err) + } + + // Filter out the dead URL using lo.Reject and prepare the updated list + urls := strings.Split(currentPath.URLs, paths.URLSeparator) + urls = lo.Reject(urls, func(u string, _ int) bool { + return u == du.URL + }) + + log.Debugw("filtered urls", "urls", urls, "dead_url", du.URL, "storage_id", du.StorageID) + + if os.Getenv("CURIO_STORAGE_META_GC_DRYRUN") != "no" { // todo drop this after testing + log.Debugw("dryrun: not updating storage path", "storage_id", du.StorageID, "urls", urls, "dead_url", du.URL, "current_urls", currentPath.URLs, "dead_urls", deadURLs) + continue + } + + if len(urls) == 0 { + // If no URLs left, remove the storage path entirely + _, err = tx.Exec("DELETE FROM storage_path WHERE storage_id = $1", du.StorageID) + if err != nil { + return false, xerrors.Errorf("deleting storage path: %w", err) + } + _, err = tx.Exec("DELETE FROM sector_location WHERE storage_id = $1", du.StorageID) + if err != nil { + return false, xerrors.Errorf("deleting sector locations: %w", err) + } + } else { + // Update the storage path with the filtered URLs + newURLs := strings.Join(urls, paths.URLSeparator) + _, err = tx.Exec("UPDATE storage_path SET urls = $1 WHERE storage_id = $2", newURLs, du.StorageID) + if err != nil { + return false, xerrors.Errorf("updating storage path urls: %w", err) + } + } + } + + return true, nil + }, harmonydb.OptionRetry()) + if err != nil { + return false, xerrors.Errorf("removing dead URLs and cleaning storage paths: %w", err) + } + if !committed { + return false, xerrors.Errorf("transaction for removing dead URLs and cleaning paths did not commit") + } + + return true, nil +} + +func (s *StorageEndpointGC) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + id := ids[0] + return &id, nil +} + +func (s *StorageEndpointGC) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: 1, + Name: "StorageMetaGC", + Cost: resources.Resources{ + Cpu: 1, + Ram: 64 << 20, + Gpu: 0, + }, + IAmBored: harmonytask.SingletonTaskAdder(StorageEndpointGCInterval, s), + } +} + +func (s *StorageEndpointGC) Adder(taskFunc harmonytask.AddTaskFunc) { + // lazy endpoint, added when bored + return +} + +var _ harmonytask.TaskInterface = &StorageEndpointGC{} diff --git a/lib/harmony/harmonydb/sql/20240416-harmony_singleton_task.sql b/lib/harmony/harmonydb/sql/20240416-harmony_singleton_task.sql new file mode 100644 index 000000000..d565cfa47 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20240416-harmony_singleton_task.sql @@ -0,0 +1,8 @@ +create table harmony_task_singletons ( + task_name varchar(255) not null, + task_id bigint, + last_run_time timestamp, + + primary key (task_name), + foreign key (task_id) references harmony_task (id) on delete set null +); diff --git a/lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql b/lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql new file mode 100644 index 000000000..e9771d9f3 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20240417-sector_index_gc.sql @@ -0,0 +1,13 @@ +create table sector_path_url_liveness ( + storage_id text, + url text, + + last_checked timestamp not null, + last_live timestamp, + last_dead timestamp, + last_dead_reason text, + + primary key (storage_id, url), + + foreign key (storage_id) references storage_path (storage_id) on delete cascade +) diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 759cbd322..5cca8de57 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -236,6 +236,10 @@ func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) e return pgxscan.Select(t.ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...) } +func (t *Tx) Get(s any, sql rawStringOnly, arguments ...any) error { + return pgxscan.Get(t.ctx, t.Tx, s, string(sql), arguments...) +} + func IsErrUniqueContraint(err error) bool { var e2 *pgconn.PgError return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation diff --git a/lib/harmony/harmonytask/singleton_task.go b/lib/harmony/harmonytask/singleton_task.go new file mode 100644 index 000000000..a7b2d45a2 --- /dev/null +++ b/lib/harmony/harmonytask/singleton_task.go @@ -0,0 +1,52 @@ +package harmonytask + +import ( + "errors" + "time" + + "github.com/jackc/pgx/v5" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/passcall" +) + +func SingletonTaskAdder(minInterval time.Duration, task TaskInterface) func(AddTaskFunc) error { + return passcall.Every(minInterval, func(add AddTaskFunc) error { + taskName := task.TypeDetails().Name + + add(func(taskID TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + var existingTaskID *int64 + var lastRunTime time.Time + + // Query to check the existing task entry + err = tx.QueryRow(`SELECT task_id, last_run_time FROM harmony_task_singletons WHERE task_name = $1`, taskName).Scan(&existingTaskID, &lastRunTime) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { + return false, err // return error if query failed and it's not because of missing row + } + } + + now := time.Now() + // Determine if the task should run based on the absence of a record or outdated last_run_time + shouldRun := err == pgx.ErrNoRows || (existingTaskID == nil && lastRunTime.Add(minInterval).Before(now)) + if !shouldRun { + return false, nil + } + + // Conditionally insert or update the task entry + n, err := tx.Exec(` + INSERT INTO harmony_task_singletons (task_name, task_id, last_run_time) + VALUES ($1, $2, $3) + ON CONFLICT (task_name) DO UPDATE + SET task_id = COALESCE(harmony_task_singletons.task_id, $2), + last_run_time = $3 + WHERE harmony_task_singletons.task_id IS NULL + `, taskName, taskID, now) + if err != nil { + return false, err + } + return n > 0, nil + }) + return nil + }) +} diff --git a/lib/passcall/every.go b/lib/passcall/every.go new file mode 100644 index 000000000..f39543063 --- /dev/null +++ b/lib/passcall/every.go @@ -0,0 +1,28 @@ +package passcall + +import ( + "sync" + "time" +) + +// Every is a helper function that will call the provided callback +// function at most once every `passEvery` duration. If the function is called +// more frequently than that, it will return nil and not call the callback. +func Every[P, R any](passInterval time.Duration, cb func(P) R) func(P) R { + var lastCall time.Time + var lk sync.Mutex + + return func(param P) R { + lk.Lock() + defer lk.Unlock() + + if time.Since(lastCall) < passInterval { + return *new(R) + } + + defer func() { + lastCall = time.Now() + }() + return cb(param) + } +} diff --git a/storage/paths/remote.go b/storage/paths/remote.go index abf8622e1..ab2754863 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -416,49 +416,55 @@ func (r *Remote) FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, er } for _, urlStr := range si.URLs { - rl, err := url.Parse(urlStr) + out, err := r.StatUrl(ctx, urlStr, id) if err != nil { - log.Warnw("failed to parse URL", "url", urlStr, "error", err) - continue // Try the next URL + log.Warnw("stat url failed", "url", urlStr, "error", err) + continue } - rl.Path = gopath.Join(rl.Path, "stat", string(id)) - - req, err := http.NewRequest("GET", rl.String(), nil) - if err != nil { - log.Warnw("creating request failed", "url", rl.String(), "error", err) - continue // Try the next URL - } - req.Header = r.auth - req = req.WithContext(ctx) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - log.Warnw("request failed", "url", rl.String(), "error", err) - continue // Try the next URL - } - - if resp.StatusCode == 200 { - var out fsutil.FsStat - if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { - _ = resp.Body.Close() - log.Warnw("decoding response failed", "url", rl.String(), "error", err) - continue // Try the next URL - } - _ = resp.Body.Close() - return out, nil // Successfully decoded, return the result - } - - // non-200 status code - b, _ := io.ReadAll(resp.Body) // Best-effort read the body for logging - log.Warnw("request to endpoint failed", "url", rl.String(), "statusCode", resp.StatusCode, "response", string(b)) - _ = resp.Body.Close() - // Continue to try the next URL, don't return here as we want to try all URLs + return out, nil } return fsutil.FsStat{}, xerrors.Errorf("all endpoints failed for remote storage %s", id) } +func (r *Remote) StatUrl(ctx context.Context, urlStr string, id storiface.ID) (fsutil.FsStat, error) { + rl, err := url.Parse(urlStr) + if err != nil { + return fsutil.FsStat{}, xerrors.Errorf("parsing URL: %w", err) + } + + rl.Path = gopath.Join(rl.Path, "stat", string(id)) + + req, err := http.NewRequest("GET", rl.String(), nil) + if err != nil { + return fsutil.FsStat{}, xerrors.Errorf("creating request failed: %w", err) + } + req.Header = r.auth + req = req.WithContext(ctx) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fsutil.FsStat{}, xerrors.Errorf("do request: %w", err) + } + + if resp.StatusCode == 200 { + var out fsutil.FsStat + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + _ = resp.Body.Close() + return fsutil.FsStat{}, xerrors.Errorf("decoding response failed: %w", err) + } + _ = resp.Body.Close() + return out, nil // Successfully decoded, return the result + } + + // non-200 status code + b, _ := io.ReadAll(resp.Body) // Best-effort read the body for logging + _ = resp.Body.Close() + + return fsutil.FsStat{}, xerrors.Errorf("endpoint failed %s: %d %s", rl.String(), resp.StatusCode, string(b)) +} + func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) { if len(r.limit) >= cap(r.limit) { log.Infof("Throttling remote read, %d already running", len(r.limit))