From 7c292905a833ad03fc63c38d12311d60706bef8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 18 Apr 2024 12:56:22 +0200 Subject: [PATCH] curio: Make sector index GC work --- curiosrc/gc/storage_endpoint_gc.go | 24 +++++++++++++++-------- lib/harmony/harmonytask/singleton_task.go | 2 +- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/curiosrc/gc/storage_endpoint_gc.go b/curiosrc/gc/storage_endpoint_gc.go index 1dd26ee03..191945773 100644 --- a/curiosrc/gc/storage_endpoint_gc.go +++ b/curiosrc/gc/storage_endpoint_gc.go @@ -2,7 +2,6 @@ package gc import ( "context" - "os" "strings" "sync" "time" @@ -149,13 +148,27 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool lastDeadReason = pingResult.res.Error.Error() } + // This function updates the liveness data for a URL in the `sector_path_url_liveness` table. + // + // On conflict, where the same `storage_id` and `url` are found: + // - last_checked is always updated to the current timestamp. + // - last_live is updated to the new `last_live` if it is not null; otherwise, it retains the existing value. + // - last_dead is conditionally updated based on two criteria: + // 1. It is set to the new `last_dead` if the existing `last_dead` is null (indicating this is the first recorded failure). + // 2. It is updated to the new `last_dead` if there has been a live instance recorded after the most recent dead timestamp, indicating the resource was alive again before this new failure. + // 3. It retains the existing value if none of the above conditions are met. + // - last_dead_reason is updated similarly to `last_live`, using COALESCE to prefer the new reason if it's provided. _, err := tx.Exec(` INSERT INTO sector_path_url_liveness (storage_id, url, last_checked, last_live, last_dead, last_dead_reason) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (storage_id, url) DO UPDATE SET last_checked = EXCLUDED.last_checked, last_live = COALESCE(EXCLUDED.last_live, sector_path_url_liveness.last_live), - last_dead = COALESCE(EXCLUDED.last_dead, sector_path_url_liveness.last_dead), + last_dead = CASE + WHEN sector_path_url_liveness.last_dead IS NULL THEN EXCLUDED.last_dead + WHEN sector_path_url_liveness.last_dead IS NOT NULL AND sector_path_url_liveness.last_live > sector_path_url_liveness.last_dead THEN EXCLUDED.last_dead + ELSE sector_path_url_liveness.last_dead + END, last_dead_reason = COALESCE(EXCLUDED.last_dead_reason, sector_path_url_liveness.last_dead_reason) `, pingResult.storageID, pingResult.url, currentTime, lastLive, lastDead, lastDeadReason) if err != nil { @@ -186,7 +199,7 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool } err = tx.Select(&deadURLs, ` SELECT storage_id, url FROM sector_path_url_liveness - WHERE last_dead > last_live AND last_dead < $1 + WHERE last_dead > COALESCE(last_live, '1970-01-01') AND last_dead < $1 `, currentTime.Add(-StorageEndpointDeadTime).UTC()) if err != nil { return false, xerrors.Errorf("selecting dead URLs: %w", err) @@ -211,11 +224,6 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool log.Debugw("filtered urls", "urls", urls, "dead_url", du.URL, "storage_id", du.StorageID) - if os.Getenv("CURIO_STORAGE_META_GC_DRYRUN") != "no" { // todo drop this after testing - log.Debugw("dryrun: not updating storage path", "storage_id", du.StorageID, "urls", urls, "dead_url", du.URL, "current_urls", URLs, "dead_urls", deadURLs) - continue - } - if len(urls) == 0 { // If no URLs left, remove the storage path entirely _, err = tx.Exec("DELETE FROM storage_path WHERE storage_id = $1", du.StorageID) diff --git a/lib/harmony/harmonytask/singleton_task.go b/lib/harmony/harmonytask/singleton_task.go index a7b2d45a2..720033410 100644 --- a/lib/harmony/harmonytask/singleton_task.go +++ b/lib/harmony/harmonytask/singleton_task.go @@ -26,7 +26,7 @@ func SingletonTaskAdder(minInterval time.Duration, task TaskInterface) func(AddT } } - now := time.Now() + now := time.Now().UTC() // Determine if the task should run based on the absence of a record or outdated last_run_time shouldRun := err == pgx.ErrNoRows || (existingTaskID == nil && lastRunTime.Add(minInterval).Before(now)) if !shouldRun {