From 4649d3b8d0d538646899081cde2d22ccfc52749a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 Jan 2024 12:43:57 +0100 Subject: [PATCH] lpseal: Finalize --- cmd/lotus-provider/tasks/tasks.go | 3 +- .../harmonydb/sql/20231217-sdr-pipeline.sql | 4 + node/config/types.go | 1 + provider/lpffi/sdr_funcs.go | 20 +++ provider/lpseal/poller.go | 24 +++- provider/lpseal/task_finalize.go | 124 ++++++++++++++++++ 6 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 provider/lpseal/task_finalize.go diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index d4332843f..2b161ed0c 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -77,7 +77,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task } if cfg.Subsystems.EnableSealSDRTrees { treesTask := lpseal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) - activeTasks = append(activeTasks, treesTask) + finalizeTask := lpseal.NewFinalizeTask(cfg.Subsystems.FinalizeMaxTasks, sp, slr, db) + activeTasks = append(activeTasks, treesTask, finalizeTask) } if cfg.Subsystems.EnableSendPrecommitMsg { precommitTask := lpseal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee) diff --git a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql index 43c0c8498..b52670e71 100644 --- a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql +++ b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql @@ -51,6 +51,10 @@ create table sectors_sdr_pipeline ( porep_proof bytea, after_porep bool not null default false, + -- Finalize (trim cache) + task_id_finalize bigint, + after_finalize bool not null default false, + -- Commit message sending commit_msg_cid text, diff --git a/node/config/types.go b/node/config/types.go index 05ed8786f..d64ea387c 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -110,6 +110,7 @@ type ProviderSubsystemsConfig struct { // In lotus-miner this was run as part of PreCommit2 (TreeD was run in PreCommit1). EnableSealSDRTrees bool SealSDRTreesMaxTasks int + FinalizeMaxTasks int // EnableSendPrecommitMsg enables the sending of precommit messages to the chain // from this lotus-provider instance. diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index ec8c7f906..dd582b016 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -274,3 +274,23 @@ func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof) return json.Marshal(phase1Output) } + +func (sb *SealCalls) LocalStorage(ctx context.Context) ([]storiface.StoragePath, error) { + return sb.sectors.localStore.Local(ctx) +} + +func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error { + paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) + if err != nil { + return xerrors.Errorf("acquiring sector paths: %w", err) + } + defer releaseSector() + + ssize, err := sector.ProofType.SectorSize() + + if err := ffi.ClearCache(uint64(ssize), paths.Cache); err != nil { + return xerrors.Errorf("clearing cache: %w", err) + } + + return nil +} diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index a51e7d859..d19fb1940 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -25,6 +25,7 @@ const ( pollerPrecommitMsg pollerPoRep pollerCommitMsg + pollerFinalize numPollers ) @@ -94,6 +95,9 @@ type pollTask struct { PoRepProof []byte `db:"porep_proof"` AfterPoRep bool `db:"after_porep"` + TaskFinalize *int64 `db:"task_id_finalize"` + AfterFinalize bool `db:"after_finalize"` + TaskCommitMsg *int64 `db:"task_id_commit_msg"` AfterCommitMsg bool `db:"after_commit_msg"` @@ -115,10 +119,11 @@ func (s *SealPoller) poll(ctx context.Context) error { task_id_precommit_msg, after_precommit_msg, after_precommit_msg_success, seed_epoch, task_id_porep, porep_proof, after_porep, + task_id_finalize, after_finalize, task_id_commit_msg, after_commit_msg, after_commit_msg_success, failed, failed_reason - FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true`) + FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true or after_finalize != true`) if err != nil { return err } @@ -139,6 +144,7 @@ func (s *SealPoller) poll(ctx context.Context) error { s.pollStartPrecommitMsg(ctx, task) s.mustPoll(s.pollPrecommitMsgLanded(ctx, task)) s.pollStartPoRep(ctx, task, ts) + s.pollStartFinalize(ctx, task, ts) s.pollStartCommitMsg(ctx, task) s.mustPoll(s.pollCommitMsgLanded(ctx, task)) } @@ -195,6 +201,22 @@ func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *type } } +func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *types.TipSet) { + if s.pollers[pollerFinalize].IsSet() && task.AfterPoRep && task.TaskFinalize == nil { + s.pollers[pollerFinalize].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_finalize is null`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } +} + func (s *SealPoller) mustPoll(err error) { if err != nil { log.Errorw("poller operation failed", "error", err) diff --git a/provider/lpseal/task_finalize.go b/provider/lpseal/task_finalize.go new file mode 100644 index 000000000..681fe8b26 --- /dev/null +++ b/provider/lpseal/task_finalize.go @@ -0,0 +1,124 @@ +package lpseal + +import ( + "context" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/provider/lpffi" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "golang.org/x/xerrors" +) + +type FinalizeTask struct { + max int + sp *SealPoller + sc *lpffi.SealCalls + db *harmonydb.DB +} + +func NewFinalizeTask(max int, sp *SealPoller, sc *lpffi.SealCalls, db *harmonydb.DB) *FinalizeTask { + return &FinalizeTask{ + max: max, + sp: sp, + sc: sc, + db: db, + } +} + +func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + var task struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + RegSealProof int64 `db:"reg_seal_proof"` + } + + ctx := context.Background() + + err = f.db.Select(ctx, &task, ` + select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_finalize=$1`, taskID) + if err != nil { + return false, xerrors.Errorf("getting task: %w", err) + } + + sector := storiface.SectorRef{ + ID: abi.SectorID{ + Miner: abi.ActorID(task.SpID), + Number: abi.SectorNumber(task.SectorNumber), + }, + ProofType: abi.RegisteredSealProof(task.RegSealProof), + } + + err = f.sc.FinalizeSector(ctx, sector) + if err != nil { + return false, xerrors.Errorf("finalizing sector: %w", err) + } + + // set after_finalize + _, err = f.db.Exec(ctx, `update sectors_sdr_pipeline set after_finalize=true where task_id_finalize=$1`, taskID) + if err != nil { + return false, xerrors.Errorf("updating task: %w", err) + } + + return true, nil +} + +func (f *FinalizeTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + // query: + + var tasks []struct { + TaskID harmonytask.TaskID `db:"task_id_finalize"` + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + StorageID string `db:"storage_id"` + } + + if 4 != storiface.FTCache { + panic("storiface.FTCache != 4") + } + + ctx := context.Background() + + err := f.db.Select(ctx, &tasks, ` + select p.task_id_finalize, p.sp_id, p.sector_number, l.storage_id from sectors_sdr_pipeline p + inner join sector_location l on p.sp_id=l.miner_id and p.sector_number=l.sector_num + where task_id_finalize in ($1) and l.sector_filetype=4`, ids) + if err != nil { + return nil, xerrors.Errorf("getting tasks: %w", err) + } + + ls, err := f.sc.LocalStorage(ctx) + if err != nil { + return nil, xerrors.Errorf("getting local storage: %w", err) + } + + for _, t := range tasks { + for _, l := range ls { + if string(l.ID) == t.StorageID { + return &t.TaskID, nil + } + } + } + + return nil, nil +} + +func (f *FinalizeTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: f.max, + Name: "Finalize", + Cost: resources.Resources{ + Cpu: 1, + Gpu: 0, + Ram: 100 << 20, + }, + MaxFailures: 10, + } +} + +func (f *FinalizeTask) Adder(taskFunc harmonytask.AddTaskFunc) { + f.sp.pollers[pollerFinalize].Set(taskFunc) +} + +var _ harmonytask.TaskInterface = &FinalizeTask{}