288 lines
9.0 KiB
Go
288 lines
9.0 KiB
Go
package gc
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
logging "github.com/ipfs/go-log/v2"
|
|
"github.com/samber/lo"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
|
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
|
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
|
"github.com/filecoin-project/lotus/lib/result"
|
|
"github.com/filecoin-project/lotus/storage/paths"
|
|
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
|
)
|
|
|
|
var log = logging.Logger("curiogc")
|
|
|
|
const StorageEndpointGCInterval = 21 * time.Minute
|
|
const StorageEndpointDeadTime = StorageEndpointGCInterval * 6 // ~2h
|
|
const MaxParallelEndpointChecks = 32
|
|
|
|
type StorageEndpointGC struct {
|
|
si *paths.DBIndex
|
|
remote *paths.Remote
|
|
db *harmonydb.DB
|
|
}
|
|
|
|
func NewStorageEndpointGC(si *paths.DBIndex, remote *paths.Remote, db *harmonydb.DB) *StorageEndpointGC {
|
|
return &StorageEndpointGC{
|
|
si: si,
|
|
remote: remote,
|
|
db: db,
|
|
}
|
|
}
|
|
|
|
func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
|
/*
|
|
1. Get all storage paths + urls (endpoints)
|
|
2. Ping each url, record results
|
|
3. Update sector_path_url_liveness with success/failure
|
|
4.1 If a URL was consistently down for StorageEndpointDeadTime, remove it from the storage_path table
|
|
4.2 Remove storage paths with no URLs remaining
|
|
4.2.1 in the same transaction remove sector refs to the dead path
|
|
*/
|
|
|
|
ctx := context.Background()
|
|
|
|
var pathRefs []struct {
|
|
StorageID storiface.ID `db:"storage_id"`
|
|
Urls string `db:"urls"`
|
|
LastHeartbeat *time.Time `db:"last_heartbeat"`
|
|
}
|
|
|
|
err = s.db.Select(ctx, &pathRefs, `SELECT storage_id, urls, last_heartbeat FROM storage_path`)
|
|
if err != nil {
|
|
return false, xerrors.Errorf("getting path metadata: %w", err)
|
|
}
|
|
|
|
type pingResult struct {
|
|
storageID storiface.ID
|
|
url string
|
|
|
|
res result.Result[fsutil.FsStat]
|
|
}
|
|
|
|
var pingResults []pingResult
|
|
var resultLk sync.Mutex
|
|
var resultThrottle = make(chan struct{}, MaxParallelEndpointChecks)
|
|
|
|
for _, pathRef := range pathRefs {
|
|
pathRef := pathRef
|
|
urls := strings.Split(pathRef.Urls, paths.URLSeparator)
|
|
|
|
for _, url := range urls {
|
|
url := url
|
|
|
|
select {
|
|
case resultThrottle <- struct{}{}:
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
}
|
|
|
|
go func() {
|
|
defer func() {
|
|
<-resultThrottle
|
|
}()
|
|
|
|
st, err := s.remote.StatUrl(ctx, url, pathRef.StorageID)
|
|
|
|
res := pingResult{
|
|
storageID: pathRef.StorageID,
|
|
url: url,
|
|
res: result.Wrap(st, err),
|
|
}
|
|
|
|
resultLk.Lock()
|
|
pingResults = append(pingResults, res)
|
|
resultLk.Unlock()
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Wait for all pings to finish
|
|
for i := 0; i < MaxParallelEndpointChecks; i++ {
|
|
select {
|
|
case resultThrottle <- struct{}{}:
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Update the liveness table
|
|
|
|
/*
|
|
create table sector_path_url_liveness (
|
|
storage_id text,
|
|
url text,
|
|
|
|
last_checked timestamp not null,
|
|
last_live timestamp,
|
|
last_dead timestamp,
|
|
last_dead_reason text,
|
|
|
|
primary key (storage_id, url),
|
|
|
|
foreign key (storage_id) references storage_path (storage_id) on delete cascade
|
|
)
|
|
*/
|
|
|
|
currentTime := time.Now().UTC()
|
|
|
|
committed, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
|
|
for _, pingResult := range pingResults {
|
|
var lastLive, lastDead, lastDeadReason interface{}
|
|
if pingResult.res.Error == nil {
|
|
lastLive = currentTime.UTC()
|
|
lastDead = nil
|
|
lastDeadReason = nil
|
|
} else {
|
|
lastLive = nil
|
|
lastDead = currentTime.UTC()
|
|
lastDeadReason = pingResult.res.Error.Error()
|
|
}
|
|
|
|
// This function updates the liveness data for a URL in the `sector_path_url_liveness` table.
|
|
//
|
|
// On conflict, where the same `storage_id` and `url` are found:
|
|
// - last_checked is always updated to the current timestamp.
|
|
// - last_live is updated to the new `last_live` if it is not null; otherwise, it retains the existing value.
|
|
// - last_dead is conditionally updated based on two criteria:
|
|
// 1. It is set to the new `last_dead` if the existing `last_dead` is null (indicating this is the first recorded failure).
|
|
// 2. It is updated to the new `last_dead` if there has been a live instance recorded after the most recent dead timestamp, indicating the resource was alive again before this new failure.
|
|
// 3. It retains the existing value if none of the above conditions are met.
|
|
// - last_dead_reason is updated similarly to `last_live`, using COALESCE to prefer the new reason if it's provided.
|
|
_, err := tx.Exec(`
|
|
INSERT INTO sector_path_url_liveness (storage_id, url, last_checked, last_live, last_dead, last_dead_reason)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
ON CONFLICT (storage_id, url) DO UPDATE
|
|
SET last_checked = EXCLUDED.last_checked,
|
|
last_live = COALESCE(EXCLUDED.last_live, sector_path_url_liveness.last_live),
|
|
last_dead = CASE
|
|
WHEN sector_path_url_liveness.last_dead IS NULL THEN EXCLUDED.last_dead
|
|
WHEN sector_path_url_liveness.last_dead IS NOT NULL AND sector_path_url_liveness.last_live > sector_path_url_liveness.last_dead THEN EXCLUDED.last_dead
|
|
ELSE sector_path_url_liveness.last_dead
|
|
END,
|
|
last_dead_reason = COALESCE(EXCLUDED.last_dead_reason, sector_path_url_liveness.last_dead_reason)
|
|
`, pingResult.storageID, pingResult.url, currentTime, lastLive, lastDead, lastDeadReason)
|
|
if err != nil {
|
|
return false, xerrors.Errorf("updating liveness data: %w", err)
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, harmonydb.OptionRetry())
|
|
if err != nil {
|
|
return false, xerrors.Errorf("sector_path_url_liveness update: %w", err)
|
|
}
|
|
if !committed {
|
|
return false, xerrors.Errorf("sector_path_url_liveness update: transaction didn't commit")
|
|
}
|
|
|
|
///////
|
|
// Now we do the actual database cleanup
|
|
if !stillOwned() {
|
|
return false, xerrors.Errorf("task no longer owned")
|
|
}
|
|
|
|
committed, err = s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
|
|
// Identify URLs that are consistently down
|
|
var deadURLs []struct {
|
|
StorageID storiface.ID
|
|
URL string
|
|
}
|
|
err = tx.Select(&deadURLs, `
|
|
SELECT storage_id, url FROM sector_path_url_liveness
|
|
WHERE last_dead > COALESCE(last_live, '1970-01-01') AND last_dead < $1
|
|
`, currentTime.Add(-StorageEndpointDeadTime).UTC())
|
|
if err != nil {
|
|
return false, xerrors.Errorf("selecting dead URLs: %w", err)
|
|
}
|
|
|
|
log.Debugw("dead urls", "dead_urls", deadURLs)
|
|
|
|
// Remove dead URLs from storage_path entries and handle path cleanup
|
|
for _, du := range deadURLs {
|
|
// Fetch the current URLs for the storage path
|
|
var URLs string
|
|
err = tx.QueryRow("SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID).Scan(&URLs)
|
|
if err != nil {
|
|
return false, xerrors.Errorf("fetching storage paths: %w", err)
|
|
}
|
|
|
|
// Filter out the dead URL using lo.Reject and prepare the updated list
|
|
urls := strings.Split(URLs, paths.URLSeparator)
|
|
urls = lo.Reject(urls, func(u string, _ int) bool {
|
|
return u == du.URL
|
|
})
|
|
|
|
log.Debugw("filtered urls", "urls", urls, "dead_url", du.URL, "storage_id", du.StorageID)
|
|
|
|
if len(urls) == 0 {
|
|
// If no URLs left, remove the storage path entirely
|
|
_, err = tx.Exec("DELETE FROM storage_path WHERE storage_id = $1", du.StorageID)
|
|
if err != nil {
|
|
return false, xerrors.Errorf("deleting storage path: %w", err)
|
|
}
|
|
_, err = tx.Exec("DELETE FROM sector_location WHERE storage_id = $1", du.StorageID)
|
|
if err != nil {
|
|
return false, xerrors.Errorf("deleting sector locations: %w", err)
|
|
}
|
|
} else {
|
|
// Update the storage path with the filtered URLs
|
|
newURLs := strings.Join(urls, paths.URLSeparator)
|
|
_, err = tx.Exec("UPDATE storage_path SET urls = $1 WHERE storage_id = $2", newURLs, du.StorageID)
|
|
if err != nil {
|
|
return false, xerrors.Errorf("updating storage path urls: %w", err)
|
|
}
|
|
// Remove sector_path_url_liveness entry
|
|
_, err = tx.Exec("DELETE FROM sector_path_url_liveness WHERE storage_id = $1 AND url = $2", du.StorageID, du.URL)
|
|
if err != nil {
|
|
return false, xerrors.Errorf("deleting sector_path_url_liveness entry: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}, harmonydb.OptionRetry())
|
|
if err != nil {
|
|
return false, xerrors.Errorf("removing dead URLs and cleaning storage paths: %w", err)
|
|
}
|
|
if !committed {
|
|
return false, xerrors.Errorf("transaction for removing dead URLs and cleaning paths did not commit")
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (s *StorageEndpointGC) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
|
id := ids[0]
|
|
return &id, nil
|
|
}
|
|
|
|
func (s *StorageEndpointGC) TypeDetails() harmonytask.TaskTypeDetails {
|
|
return harmonytask.TaskTypeDetails{
|
|
Max: 1,
|
|
Name: "StorageMetaGC",
|
|
Cost: resources.Resources{
|
|
Cpu: 1,
|
|
Ram: 64 << 20,
|
|
Gpu: 0,
|
|
},
|
|
IAmBored: harmonytask.SingletonTaskAdder(StorageEndpointGCInterval, s),
|
|
}
|
|
}
|
|
|
|
func (s *StorageEndpointGC) Adder(taskFunc harmonytask.AddTaskFunc) {
|
|
// lazy endpoint, added when bored
|
|
return
|
|
}
|
|
|
|
var _ harmonytask.TaskInterface = &StorageEndpointGC{}
|