lpseal: Wire up sdr task

This commit is contained in:
Łukasz Magiera 2023-12-19 12:16:38 +01:00
parent 4eb7bc91d9
commit 5d26c3a9dc
7 changed files with 117 additions and 18 deletions

View File

@ -3,6 +3,8 @@ package tasks
import (
"context"
"github.com/filecoin-project/lotus/provider/lpffi"
"github.com/filecoin-project/lotus/provider/lpseal"
logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo"
@ -25,6 +27,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
as := dependencies.As
maddrs := dependencies.Maddrs
stor := dependencies.Stor
lstor := dependencies.LocalStore
si := dependencies.Si
var activeTasks []harmonytask.TaskInterface
@ -35,6 +38,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
///// Task Selection
///////////////////////////////////////////////////////////////////////
{
// PoSt
if cfg.Subsystems.EnableWindowPost {
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender,
@ -50,6 +54,25 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
activeTasks = append(activeTasks, winPoStTask)
}
}
{
// Sealing
hasAnySealingTask := cfg.Subsystems.EnableSealSDR
var sp *lpseal.SealPoller
var slr *lpffi.SealCalls
if hasAnySealingTask {
sp = lpseal.NewPoller(db)
go sp.RunPoller(ctx)
slr = lpffi.NewSealCalls(stor, lstor, si)
}
if cfg.Subsystems.EnableSealSDR {
sdrTask := lpseal.NewSDRTask(full, db, sp, slr, cfg.Subsystems.SealSDRMaxTasks)
activeTasks = append(activeTasks, sdrTask)
}
}
log.Infow("This lotus_provider instance handles",
"miner_addresses", maddrs,
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))

View File

@ -45,3 +45,9 @@ func (p *Promise[T]) Val(ctx context.Context) T {
return val
}
}
func (p *Promise[T]) IsSet() bool {
p.mu.Lock()
defer p.mu.Unlock()
return p.done != nil
}

View File

@ -101,6 +101,11 @@ type ProviderSubsystemsConfig struct {
EnableWebGui bool
// The address that should listen for Web GUI requests.
GuiAddress string
// EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation
// creating layers. In lotus-miner this was run as part of PreCommit1.
EnableSealSDR bool
SealSDRMaxTasks int
}
type DAGStoreConfig struct {

View File

@ -5,23 +5,72 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
)
var log = logging.Logger("lpffi")
/*
type ExternPrecommit2 func(ctx context.Context, sector storiface.SectorRef, cache, sealed string, pc1out storiface.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error)
type ExternalSealer struct {
PreCommit2 ExternPrecommit2
type ExternalSealer struct {
PreCommit2 ExternPrecommit2
}
*/
type SealCalls struct {
sectors *storageProvider
/*// externCalls cointain overrides for calling alternative sealing logic
externCalls ExternalSealer*/
}
type SealCalls struct {
sectors ffiwrapper.SectorProvider
func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCalls {
return &SealCalls{
sectors: &storageProvider{
storage: st,
localStore: ls,
sindex: si,
},
}
}
// externCalls cointain overrides for calling alternative sealing logic
externCalls ExternalSealer
type storageProvider struct {
storage paths.Store
localStore *paths.Local
sindex paths.SectorIndex
}
func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
paths, storageIDs, err := l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
if err != nil {
return storiface.SectorPaths{}, nil, err
}
releaseStorage, err := l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
if err != nil {
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
}
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
return paths, func() {
releaseStorage()
for _, fileType := range storiface.PathTypes {
if fileType&allocate == 0 {
continue
}
sid := storiface.PathByType(storageIDs, fileType)
if err := l.sindex.StorageDeclareSector(ctx, storiface.ID(sid), sector.ID, fileType, true); err != nil {
log.Errorf("declare sector error: %+v", err)
}
}
}, nil
}
func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {

View File

@ -26,16 +26,24 @@ type SealPoller struct {
pollers [numPollers]promise.Promise[harmonytask.AddTaskFunc]
}
func (s *SealPoller) RunPoller(ctx context.Context) error {
func NewPoller(db *harmonydb.DB) *SealPoller {
return &SealPoller{
db: db,
}
}
func (s *SealPoller) RunPoller(ctx context.Context) {
ticker := time.NewTicker(sealPollerInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
return
case <-ticker.C:
s.poll(ctx)
if err := s.poll(ctx); err != nil {
log.Errorf("polling sdr sector pipeline: %w", err)
}
}
}
}
@ -86,7 +94,7 @@ func (s *SealPoller) poll(ctx context.Context) error {
continue
}
if task.TaskSDR == nil {
if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() {
s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3`, id, task.SpID, task.SectorNumber)
if err != nil {

View File

@ -32,6 +32,16 @@ type SDRTask struct {
maxSDR int
}
func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *lpffi.SealCalls, maxSDR int) *SDRTask {
return &SDRTask{
api: api,
db: db,
sp: sp,
sc: sc,
maxSDR: maxSDR,
}
}
func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()

View File

@ -28,8 +28,6 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSealed, storiface.FTCache, storiface.FTUpdate, storiface.FTUpdateCache}
type WorkerConfig struct {
TaskTypes []sealtasks.TaskType
NoSwap bool
@ -167,7 +165,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor
return paths, func() {
releaseStorage()
for _, fileType := range pathTypes {
for _, fileType := range storiface.PathTypes {
if fileType&allocate == 0 {
continue
}
@ -180,16 +178,16 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor
}, nil
}
func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype)
}
func FFIExec(opts ...ffiwrapper.FFIWrapperOpt) func(l *LocalWorker) (storiface.Storage, error) {
return func(l *LocalWorker) (storiface.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l}, opts...)
}
}
func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype)
}
type ReturnType string
const (