From 5d26c3a9dcc77c6a263bbdd7b9193b98aab9bafe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 19 Dec 2023 12:16:38 +0100 Subject: [PATCH] lpseal: Wire up sdr task --- cmd/lotus-provider/tasks/tasks.go | 23 +++++++++++ lib/promise/promise.go | 6 +++ node/config/types.go | 5 +++ provider/lpffi/sdr_funcs.go | 63 +++++++++++++++++++++++++++---- provider/lpseal/poller.go | 16 ++++++-- provider/lpseal/task_sdr.go | 10 +++++ storage/sealer/worker_local.go | 12 +++--- 7 files changed, 117 insertions(+), 18 deletions(-) diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index 2c4cd58bf..418031157 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -3,6 +3,8 @@ package tasks import ( "context" + "github.com/filecoin-project/lotus/provider/lpffi" + "github.com/filecoin-project/lotus/provider/lpseal" logging "github.com/ipfs/go-log/v2" "github.com/samber/lo" @@ -25,6 +27,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task as := dependencies.As maddrs := dependencies.Maddrs stor := dependencies.Stor + lstor := dependencies.LocalStore si := dependencies.Si var activeTasks []harmonytask.TaskInterface @@ -35,6 +38,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task ///// Task Selection /////////////////////////////////////////////////////////////////////// { + // PoSt if cfg.Subsystems.EnableWindowPost { wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, @@ -50,6 +54,25 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task activeTasks = append(activeTasks, winPoStTask) } } + + { + // Sealing + hasAnySealingTask := cfg.Subsystems.EnableSealSDR + + var sp *lpseal.SealPoller + var slr *lpffi.SealCalls + if hasAnySealingTask { + sp = lpseal.NewPoller(db) + go sp.RunPoller(ctx) + + slr = lpffi.NewSealCalls(stor, lstor, si) + } + + if cfg.Subsystems.EnableSealSDR { + sdrTask := lpseal.NewSDRTask(full, db, sp, slr, cfg.Subsystems.SealSDRMaxTasks) + activeTasks = append(activeTasks, sdrTask) + } + } log.Infow("This lotus_provider instance handles", "miner_addresses", maddrs, "tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name })) diff --git a/lib/promise/promise.go b/lib/promise/promise.go index 9b6a5e2b0..02e917ca1 100644 --- a/lib/promise/promise.go +++ b/lib/promise/promise.go @@ -45,3 +45,9 @@ func (p *Promise[T]) Val(ctx context.Context) T { return val } } + +func (p *Promise[T]) IsSet() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.done != nil +} diff --git a/node/config/types.go b/node/config/types.go index 208d85939..65fa4da02 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -101,6 +101,11 @@ type ProviderSubsystemsConfig struct { EnableWebGui bool // The address that should listen for Web GUI requests. GuiAddress string + + // EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation + // creating layers. In lotus-miner this was run as part of PreCommit1. + EnableSealSDR bool + SealSDRMaxTasks int } type DAGStoreConfig struct { diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index c7ff7064a..fc1748213 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -5,23 +5,72 @@ import ( ffi "github.com/filecoin-project/filecoin-ffi" commcid "github.com/filecoin-project/go-fil-commcid" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" ) +var log = logging.Logger("lpffi") + +/* type ExternPrecommit2 func(ctx context.Context, sector storiface.SectorRef, cache, sealed string, pc1out storiface.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) -type ExternalSealer struct { - PreCommit2 ExternPrecommit2 + type ExternalSealer struct { + PreCommit2 ExternPrecommit2 + } +*/ +type SealCalls struct { + sectors *storageProvider + + /*// externCalls cointain overrides for calling alternative sealing logic + externCalls ExternalSealer*/ } -type SealCalls struct { - sectors ffiwrapper.SectorProvider +func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCalls { + return &SealCalls{ + sectors: &storageProvider{ + storage: st, + localStore: ls, + sindex: si, + }, + } +} - // externCalls cointain overrides for calling alternative sealing logic - externCalls ExternalSealer +type storageProvider struct { + storage paths.Store + localStore *paths.Local + sindex paths.SectorIndex +} + +func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) { + paths, storageIDs, err := l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove) + if err != nil { + return storiface.SectorPaths{}, nil, err + } + + releaseStorage, err := l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal) + if err != nil { + return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) + } + + log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) + + return paths, func() { + releaseStorage() + + for _, fileType := range storiface.PathTypes { + if fileType&allocate == 0 { + continue + } + + sid := storiface.PathByType(storageIDs, fileType) + if err := l.sindex.StorageDeclareSector(ctx, storiface.ID(sid), sector.ID, fileType, true); err != nil { + log.Errorf("declare sector error: %+v", err) + } + } + }, nil } func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error { diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index d299832ec..4747eb332 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -26,16 +26,24 @@ type SealPoller struct { pollers [numPollers]promise.Promise[harmonytask.AddTaskFunc] } -func (s *SealPoller) RunPoller(ctx context.Context) error { +func NewPoller(db *harmonydb.DB) *SealPoller { + return &SealPoller{ + db: db, + } +} + +func (s *SealPoller) RunPoller(ctx context.Context) { ticker := time.NewTicker(sealPollerInterval) defer ticker.Stop() for { select { case <-ctx.Done(): - return nil + return case <-ticker.C: - s.poll(ctx) + if err := s.poll(ctx); err != nil { + log.Errorf("polling sdr sector pipeline: %w", err) + } } } } @@ -86,7 +94,7 @@ func (s *SealPoller) poll(ctx context.Context) error { continue } - if task.TaskSDR == nil { + if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3`, id, task.SpID, task.SectorNumber) if err != nil { diff --git a/provider/lpseal/task_sdr.go b/provider/lpseal/task_sdr.go index 1b5268cab..928344069 100644 --- a/provider/lpseal/task_sdr.go +++ b/provider/lpseal/task_sdr.go @@ -32,6 +32,16 @@ type SDRTask struct { maxSDR int } +func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *lpffi.SealCalls, maxSDR int) *SDRTask { + return &SDRTask{ + api: api, + db: db, + sp: sp, + sc: sc, + maxSDR: maxSDR, + } +} + func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { ctx := context.Background() diff --git a/storage/sealer/worker_local.go b/storage/sealer/worker_local.go index 7fc494955..417a15e62 100644 --- a/storage/sealer/worker_local.go +++ b/storage/sealer/worker_local.go @@ -28,8 +28,6 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/storiface" ) -var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSealed, storiface.FTCache, storiface.FTUpdate, storiface.FTUpdateCache} - type WorkerConfig struct { TaskTypes []sealtasks.TaskType NoSwap bool @@ -167,7 +165,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor return paths, func() { releaseStorage() - for _, fileType := range pathTypes { + for _, fileType := range storiface.PathTypes { if fileType&allocate == 0 { continue } @@ -180,16 +178,16 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor }, nil } +func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { + return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype) +} + func FFIExec(opts ...ffiwrapper.FFIWrapperOpt) func(l *LocalWorker) (storiface.Storage, error) { return func(l *LocalWorker) (storiface.Storage, error) { return ffiwrapper.New(&localWorkerPathProvider{w: l}, opts...) } } -func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { - return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype) -} - type ReturnType string const (