lpseal: Finalize

This commit is contained in:
Łukasz Magiera 2024-01-30 12:43:57 +01:00
parent ce12cc05f7
commit 4649d3b8d0
6 changed files with 174 additions and 2 deletions

View File

@ -77,7 +77,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
} }
if cfg.Subsystems.EnableSealSDRTrees { if cfg.Subsystems.EnableSealSDRTrees {
treesTask := lpseal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) treesTask := lpseal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
activeTasks = append(activeTasks, treesTask) finalizeTask := lpseal.NewFinalizeTask(cfg.Subsystems.FinalizeMaxTasks, sp, slr, db)
activeTasks = append(activeTasks, treesTask, finalizeTask)
} }
if cfg.Subsystems.EnableSendPrecommitMsg { if cfg.Subsystems.EnableSendPrecommitMsg {
precommitTask := lpseal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee) precommitTask := lpseal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee)

View File

@ -51,6 +51,10 @@ create table sectors_sdr_pipeline (
porep_proof bytea, porep_proof bytea,
after_porep bool not null default false, after_porep bool not null default false,
-- Finalize (trim cache)
task_id_finalize bigint,
after_finalize bool not null default false,
-- Commit message sending -- Commit message sending
commit_msg_cid text, commit_msg_cid text,

View File

@ -110,6 +110,7 @@ type ProviderSubsystemsConfig struct {
// In lotus-miner this was run as part of PreCommit2 (TreeD was run in PreCommit1). // In lotus-miner this was run as part of PreCommit2 (TreeD was run in PreCommit1).
EnableSealSDRTrees bool EnableSealSDRTrees bool
SealSDRTreesMaxTasks int SealSDRTreesMaxTasks int
FinalizeMaxTasks int
// EnableSendPrecommitMsg enables the sending of precommit messages to the chain // EnableSendPrecommitMsg enables the sending of precommit messages to the chain
// from this lotus-provider instance. // from this lotus-provider instance.

View File

@ -274,3 +274,23 @@ func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof)
return json.Marshal(phase1Output) return json.Marshal(phase1Output)
} }
func (sb *SealCalls) LocalStorage(ctx context.Context) ([]storiface.StoragePath, error) {
return sb.sectors.localStore.Local(ctx)
}
func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error {
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
defer releaseSector()
ssize, err := sector.ProofType.SectorSize()
if err := ffi.ClearCache(uint64(ssize), paths.Cache); err != nil {
return xerrors.Errorf("clearing cache: %w", err)
}
return nil
}

View File

@ -25,6 +25,7 @@ const (
pollerPrecommitMsg pollerPrecommitMsg
pollerPoRep pollerPoRep
pollerCommitMsg pollerCommitMsg
pollerFinalize
numPollers numPollers
) )
@ -94,6 +95,9 @@ type pollTask struct {
PoRepProof []byte `db:"porep_proof"` PoRepProof []byte `db:"porep_proof"`
AfterPoRep bool `db:"after_porep"` AfterPoRep bool `db:"after_porep"`
TaskFinalize *int64 `db:"task_id_finalize"`
AfterFinalize bool `db:"after_finalize"`
TaskCommitMsg *int64 `db:"task_id_commit_msg"` TaskCommitMsg *int64 `db:"task_id_commit_msg"`
AfterCommitMsg bool `db:"after_commit_msg"` AfterCommitMsg bool `db:"after_commit_msg"`
@ -115,10 +119,11 @@ func (s *SealPoller) poll(ctx context.Context) error {
task_id_precommit_msg, after_precommit_msg, task_id_precommit_msg, after_precommit_msg,
after_precommit_msg_success, seed_epoch, after_precommit_msg_success, seed_epoch,
task_id_porep, porep_proof, after_porep, task_id_porep, porep_proof, after_porep,
task_id_finalize, after_finalize,
task_id_commit_msg, after_commit_msg, task_id_commit_msg, after_commit_msg,
after_commit_msg_success, after_commit_msg_success,
failed, failed_reason failed, failed_reason
FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true`) FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true or after_finalize != true`)
if err != nil { if err != nil {
return err return err
} }
@ -139,6 +144,7 @@ func (s *SealPoller) poll(ctx context.Context) error {
s.pollStartPrecommitMsg(ctx, task) s.pollStartPrecommitMsg(ctx, task)
s.mustPoll(s.pollPrecommitMsgLanded(ctx, task)) s.mustPoll(s.pollPrecommitMsgLanded(ctx, task))
s.pollStartPoRep(ctx, task, ts) s.pollStartPoRep(ctx, task, ts)
s.pollStartFinalize(ctx, task, ts)
s.pollStartCommitMsg(ctx, task) s.pollStartCommitMsg(ctx, task)
s.mustPoll(s.pollCommitMsgLanded(ctx, task)) s.mustPoll(s.pollCommitMsgLanded(ctx, task))
} }
@ -195,6 +201,22 @@ func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *type
} }
} }
func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *types.TipSet) {
if s.pollers[pollerFinalize].IsSet() && task.AfterPoRep && task.TaskFinalize == nil {
s.pollers[pollerFinalize].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_finalize is null`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
}
return true, nil
})
}
}
func (s *SealPoller) mustPoll(err error) { func (s *SealPoller) mustPoll(err error) {
if err != nil { if err != nil {
log.Errorw("poller operation failed", "error", err) log.Errorw("poller operation failed", "error", err)

View File

@ -0,0 +1,124 @@
package lpseal
import (
"context"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/provider/lpffi"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"golang.org/x/xerrors"
)
type FinalizeTask struct {
max int
sp *SealPoller
sc *lpffi.SealCalls
db *harmonydb.DB
}
func NewFinalizeTask(max int, sp *SealPoller, sc *lpffi.SealCalls, db *harmonydb.DB) *FinalizeTask {
return &FinalizeTask{
max: max,
sp: sp,
sc: sc,
db: db,
}
}
func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var task struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
RegSealProof int64 `db:"reg_seal_proof"`
}
ctx := context.Background()
err = f.db.Select(ctx, &task, `
select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_finalize=$1`, taskID)
if err != nil {
return false, xerrors.Errorf("getting task: %w", err)
}
sector := storiface.SectorRef{
ID: abi.SectorID{
Miner: abi.ActorID(task.SpID),
Number: abi.SectorNumber(task.SectorNumber),
},
ProofType: abi.RegisteredSealProof(task.RegSealProof),
}
err = f.sc.FinalizeSector(ctx, sector)
if err != nil {
return false, xerrors.Errorf("finalizing sector: %w", err)
}
// set after_finalize
_, err = f.db.Exec(ctx, `update sectors_sdr_pipeline set after_finalize=true where task_id_finalize=$1`, taskID)
if err != nil {
return false, xerrors.Errorf("updating task: %w", err)
}
return true, nil
}
func (f *FinalizeTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
// query:
var tasks []struct {
TaskID harmonytask.TaskID `db:"task_id_finalize"`
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
StorageID string `db:"storage_id"`
}
if 4 != storiface.FTCache {
panic("storiface.FTCache != 4")
}
ctx := context.Background()
err := f.db.Select(ctx, &tasks, `
select p.task_id_finalize, p.sp_id, p.sector_number, l.storage_id from sectors_sdr_pipeline p
inner join sector_location l on p.sp_id=l.miner_id and p.sector_number=l.sector_num
where task_id_finalize in ($1) and l.sector_filetype=4`, ids)
if err != nil {
return nil, xerrors.Errorf("getting tasks: %w", err)
}
ls, err := f.sc.LocalStorage(ctx)
if err != nil {
return nil, xerrors.Errorf("getting local storage: %w", err)
}
for _, t := range tasks {
for _, l := range ls {
if string(l.ID) == t.StorageID {
return &t.TaskID, nil
}
}
}
return nil, nil
}
func (f *FinalizeTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: f.max,
Name: "Finalize",
Cost: resources.Resources{
Cpu: 1,
Gpu: 0,
Ram: 100 << 20,
},
MaxFailures: 10,
}
}
func (f *FinalizeTask) Adder(taskFunc harmonytask.AddTaskFunc) {
f.sp.pollers[pollerFinalize].Set(taskFunc)
}
var _ harmonytask.TaskInterface = &FinalizeTask{}