From e93a3e0d4a6c0f25c3494ac5976521a9464afb35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 11 Mar 2024 21:35:16 +0100 Subject: [PATCH] lppiece: GC task --- cmd/lotus-provider/tasks/tasks.go | 3 +- .../harmonydb/sql/20240228-piece-park.sql | 2 + provider/lpffi/piece_funcs.go | 4 + provider/lppiece/task_cleanup_piece.go | 130 ++++++++++++++++++ 4 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 provider/lppiece/task_cleanup_piece.go diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index 71e794545..309cebd01 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -75,7 +75,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task // Piece handling if cfg.Subsystems.EnableParkPiece { parkPieceTask := lppiece.NewParkPieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.ParkPieceMaxTasks) - activeTasks = append(activeTasks, parkPieceTask) + cleanupPieceTask := lppiece.NewCleanupPieceTask(db, must.One(slrLazy.Val()), 0) + activeTasks = append(activeTasks, parkPieceTask, cleanupPieceTask) } } diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql index ebba68fae..b07fd3973 100644 --- a/lib/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -9,6 +9,8 @@ create table parked_pieces ( complete boolean not null default false, task_id bigint default null, + cleanup_task_id bigint default null, + foreign key (task_id) references harmony_task (id) on delete set null ); diff --git a/provider/lpffi/piece_funcs.go b/provider/lpffi/piece_funcs.go index 986d0fe4d..9ffc0b3d1 100644 --- a/provider/lpffi/piece_funcs.go +++ b/provider/lpffi/piece_funcs.go @@ -69,3 +69,7 @@ func (sb *SealCalls) WritePiece(ctx context.Context, pieceID storiface.PieceNumb func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) { return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece) } + +func (sb *SealCalls) RemovePiece(ctx context.Context, id storiface.PieceNumber) error { + return sb.sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil) +} diff --git a/provider/lppiece/task_cleanup_piece.go b/provider/lppiece/task_cleanup_piece.go new file mode 100644 index 000000000..d7f24037a --- /dev/null +++ b/provider/lppiece/task_cleanup_piece.go @@ -0,0 +1,130 @@ +package lppiece + +import ( + "context" + "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/provider/lpffi" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +type CleanupPieceTask struct { + max int + db *harmonydb.DB + sc *lpffi.SealCalls + + TF promise.Promise[harmonytask.AddTaskFunc] +} + +func NewCleanupPieceTask(db *harmonydb.DB, sc *lpffi.SealCalls, max int) *CleanupPieceTask { + pt := &CleanupPieceTask{ + db: db, + sc: sc, + + max: max, + } + go pt.pollCleanupTasks(context.Background()) + return pt +} + +func (c *CleanupPieceTask) pollCleanupTasks(ctx context.Context) { + for { + // select pieces with no refs and null cleanup_task_id + var pieceIDs []struct { + ID storiface.PieceNumber `db:"id"` + } + + err := c.db.Select(ctx, &pieceIDs, `SELECT id FROM parked_pieces WHERE cleanup_task_id IS NULL AND (SELECT count(*) FROM parked_piece_refs WHERE piece_id = parked_pieces.id) = 0`) + if err != nil { + log.Errorf("failed to get parked pieces: %s", err) + time.Sleep(PieceParkPollInterval) + continue + } + + if len(pieceIDs) == 0 { + time.Sleep(PieceParkPollInterval) + continue + } + + for _, pieceID := range pieceIDs { + pieceID := pieceID + + // create a task for each piece + c.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + // update + n, err := tx.Exec(`UPDATE parked_pieces SET cleanup_task_id = $1 WHERE id = $2 AND (SELECT count(*) FROM parked_piece_refs WHERE piece_id = parked_pieces.id) = 0`, id, pieceID.ID) + if err != nil { + return false, xerrors.Errorf("updating parked piece: %w", err) + } + + // commit only if we updated the piece + return n > 0, nil + }) + } + } +} + +func (c *CleanupPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + ctx := context.Background() + + // select by cleanup_task_id + var pieceID int64 + + err = c.db.QueryRow(ctx, "SELECT piece_id FROM parked_pieces WHERE cleanup_task_id = $1", taskID).Scan(&pieceID) + if err != nil { + return false, xerrors.Errorf("query parked_piece: %w", err) + } + + // delete from parked_pieces where id = $1 where ref count = 0 + // note: we delete from the db first because that guarantees that the piece is no longer in use + // if storage delete fails, it will be retried later is other cleanup tasks + n, err := c.db.Exec(ctx, "DELETE FROM parked_pieces WHERE id = $1 AND (SELECT count(*) FROM parked_piece_refs WHERE piece_id = $1) = 0", pieceID) + if err != nil { + return false, xerrors.Errorf("delete parked_piece: %w", err) + } + + if n == 0 { + return true, nil + } + + // remove from storage + err = c.sc.RemovePiece(ctx, storiface.PieceNumber(pieceID)) + if err != nil { + log.Errorw("remove piece", "piece_id", pieceID, "error", err) + } + + return true, nil +} + +func (c *CleanupPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + // the remove call runs on paths.Remote storage, so it doesn't really matter where it runs + + id := ids[0] + return &id, nil +} + +func (c *CleanupPieceTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: c.max, + Name: "DropPiece", + Cost: resources.Resources{ + Cpu: 1, + Gpu: 0, + Ram: 64 << 20, + Storage: nil, + }, + MaxFailures: 10, + } +} + +func (c *CleanupPieceTask) Adder(taskFunc harmonytask.AddTaskFunc) { + c.TF.Set(taskFunc) +} + +var _ harmonytask.TaskInterface = &CleanupPieceTask{}