From 5f3675a53673889fec4daac2b0c844bc9d444880 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 27 Feb 2024 20:23:42 +0100 Subject: [PATCH] lpseal: SDR Storage revervations --- provider/lpffi/sdr_funcs.go | 63 ++++++--- provider/lpffi/task_storage.go | 241 +++++++++++++++++++++++++++++++++ provider/lpseal/task_sdr.go | 29 +++- 3 files changed, 310 insertions(+), 23 deletions(-) create mode 100644 provider/lpffi/task_storage.go diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index daf7d5381..05a7bbd80 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -11,6 +11,7 @@ import ( "github.com/KarpelesLab/reflink" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + "github.com/puzpuzpuz/xsync/v2" "golang.org/x/xerrors" ffi "github.com/filecoin-project/filecoin-ffi" @@ -18,6 +19,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" proof2 "github.com/filecoin-project/go-state-types/proof" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/provider/lpproof" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/proofpaths" @@ -43,28 +45,51 @@ type SealCalls struct { func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCalls { return &SealCalls{ sectors: &storageProvider{ - storage: st, - localStore: ls, - sindex: si, + storage: st, + localStore: ls, + sindex: si, + storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, *StorageReservation](), }, } } type storageProvider struct { - storage paths.Store - localStore *paths.Local - sindex paths.SectorIndex + storage paths.Store + localStore *paths.Local + sindex paths.SectorIndex + storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation] } -func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) { - paths, storageIDs, err := l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove) - if err != nil { - return storiface.SectorPaths{}, nil, err - } +func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) { + var paths, storageIDs storiface.SectorPaths + var releaseStorage func() - releaseStorage, err := l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal) - if err != nil { - return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) + var ok bool + var resv *StorageReservation + if taskID != nil { + resv, ok = l.storageReservations.Load(*taskID) + } + if ok { + if resv.Alloc != allocate || resv.Existing != existing { + // this should never happen, only when task definition is wrong + return storiface.SectorPaths{}, nil, xerrors.Errorf("storage reservation type mismatch") + } + + log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate) + + paths = resv.Paths + releaseStorage = resv.Release + } else { + var err error + paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove) + if err != nil { + return storiface.SectorPaths{}, nil, err + } + + releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal) + if err != nil { + return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) + } } log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) @@ -85,8 +110,8 @@ func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.Se }, nil } -func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error { - paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing) +func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error { + paths, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing) if err != nil { return xerrors.Errorf("acquiring sector paths: %w", err) } @@ -120,7 +145,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size maybeUns := storiface.FTNone // todo sectors with data - paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, maybeUns, storiface.PathSealing) + paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, maybeUns, storiface.PathSealing) if err != nil { return cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) } @@ -135,7 +160,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err) } - paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) + paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) } @@ -331,7 +356,7 @@ func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.Sector alloc = storiface.FTUnsealed } - sectorPaths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, alloc, storiface.PathSealing) + sectorPaths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, alloc, storiface.PathSealing) if err != nil { return xerrors.Errorf("acquiring sector paths: %w", err) } diff --git a/provider/lpffi/task_storage.go b/provider/lpffi/task_storage.go new file mode 100644 index 000000000..01d6671c7 --- /dev/null +++ b/provider/lpffi/task_storage.go @@ -0,0 +1,241 @@ +package lpffi + +import ( + "context" + "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/must" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +type SectorRef struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"` +} + +func (sr SectorRef) ID() abi.SectorID { + return abi.SectorID{ + Miner: abi.ActorID(sr.SpID), + Number: abi.SectorNumber(sr.SectorNumber), + } +} + +func (sr SectorRef) Ref() storiface.SectorRef { + return storiface.SectorRef{ + ID: sr.ID(), + ProofType: sr.RegSealProof, + } +} + +type TaskStorage struct { + sc *SealCalls + + alloc, existing storiface.SectorFileType + ssize abi.SectorSize + pathType storiface.PathType + + taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error) +} + +type ReleaseStorageFunc func() // free storage reservation + +type StorageReservation struct { + SectorRef SectorRef + Release ReleaseStorageFunc + Paths storiface.SectorPaths + PathIDs storiface.SectorPaths + + Alloc, Existing storiface.SectorFileType +} + +func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) *TaskStorage { + return &TaskStorage{ + sc: sb, + alloc: alloc, + existing: existing, + ssize: ssize, + pathType: pathType, + taskToSectorRef: taskToSectorRef, + } +} + +func (t *TaskStorage) HasCapacity() bool { + ctx := context.Background() + + paths, err := t.sc.sectors.sindex.StorageBestAlloc(ctx, t.alloc, t.ssize, t.pathType) + if err != nil { + log.Errorf("finding best alloc in HasCapacity: %+v", err) + return false + } + + local, err := t.sc.sectors.localStore.Local(ctx) + if err != nil { + log.Errorf("getting local storage: %+v", err) + return false + } + + for _, path := range paths { + if t.pathType == storiface.PathStorage && !path.CanStore { + continue // we want to store, and this isn't a store path + } + if t.pathType == storiface.PathSealing && !path.CanSeal { + continue // we want to seal, and this isn't a seal path + } + + // check if this path is on this node + var found bool + for _, storagePath := range local { + if storagePath.ID == path.ID { + found = true + break + } + } + if !found { + // this path isn't on this node + continue + } + + // StorageBestAlloc already checks that there is enough space; Not atomic like reserving space, but it's + // good enough for HasCapacity + return true + } + + return false // no path found +} + +func (t *TaskStorage) Claim(taskID int) error { + ctx := context.Background() + + sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID)) + if err != nil { + return xerrors.Errorf("getting sector ref: %w", err) + } + + // storage writelock sector + lkctx, cancel := context.WithCancel(ctx) + + allocate := storiface.FTCache + + lockAcquireTimuout := time.Second * 10 + lockAcquireTimer := time.NewTimer(lockAcquireTimuout) + + go func() { + defer cancel() + + select { + case <-lockAcquireTimer.C: + case <-ctx.Done(): + } + }() + + if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, allocate); err != nil { + // timer will expire + return xerrors.Errorf("claim StorageLock: %w", err) + } + + if !lockAcquireTimer.Stop() { + // timer expired, so lkctx is done, and that means the lock was acquired and dropped.. + return xerrors.Errorf("failed to acquire lock") + } + defer func() { + // make sure we release the sector lock + lockAcquireTimer.Reset(0) + }() + + // find anywhere + // if found return nil, for now + s, err := t.sc.sectors.sindex.StorageFindSector(ctx, sectorRef.ID(), allocate, must.One(sectorRef.RegSealProof.SectorSize()), false) + if err != nil { + return xerrors.Errorf("claim StorageFindSector: %w", err) + } + + lp, err := t.sc.sectors.localStore.Local(ctx) + if err != nil { + return err + } + + // see if there are any non-local sector files in storage + for _, info := range s { + for _, l := range lp { + if l.ID == info.ID { + continue + } + + // TODO: Create reservation for fetching; This will require quite a bit more refactoring, but for now we'll + // only care about new allocations + return nil + } + } + + // acquire a path to make a reservation in + pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, allocate, storiface.PathSealing, storiface.AcquireMove) + if err != nil { + return err + } + + // reserve the space + release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), allocate, pathIDs, storiface.FSOverheadSeal) + if err != nil { + return err + } + + sres := &StorageReservation{ + SectorRef: sectorRef, + Release: release, + Paths: pathsFs, + PathIDs: pathIDs, + + Alloc: t.alloc, + Existing: t.existing, + } + + t.sc.sectors.storageReservations.Store(harmonytask.TaskID(taskID), sres) + + log.Debugw("claimed storage", "task_id", taskID, "sector", sectorRef.ID(), "paths", pathsFs) + + // note: we drop the sector writelock on return; THAT IS INTENTIONAL, this code runs in CanAccept, which doesn't + // guarantee that the work for this sector will happen on this node; SDR CanAccept just ensures that the node can + // run the job, harmonytask is what ensures that only one SDR runs at a time + return nil +} + +func (t *TaskStorage) MarkComplete(taskID int) error { + // MarkComplete is ALWAYS called after the task is done or not scheduled + // If Claim is called and returns without errors, MarkComplete with the same + // taskID is guaranteed to eventually be called + + sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID)) + if err != nil { + return xerrors.Errorf("getting sector ref: %w", err) + } + + sres, ok := t.sc.sectors.storageReservations.Load(harmonytask.TaskID(taskID)) + if !ok { + return xerrors.Errorf("no reservation found for task %d", taskID) + } + + if sectorRef != sres.SectorRef { + return xerrors.Errorf("reservation sector ref doesn't match task sector ref: %+v != %+v", sectorRef, sres.SectorRef) + } + + log.Debugw("marking storage complete", "task_id", taskID, "sector", sectorRef.ID(), "paths", sres.Paths) + + // remove the reservation + t.sc.sectors.storageReservations.Delete(harmonytask.TaskID(taskID)) + + // release the reservation + sres.Release() + + // note: this only frees the reservation, allocated sectors are declared in AcquireSector which is aware of + // the reservation + return nil +} + +var _ resources.Storage = &TaskStorage{} diff --git a/provider/lpseal/task_sdr.go b/provider/lpseal/task_sdr.go index 68dca5bde..694ff6f46 100644 --- a/provider/lpseal/task_sdr.go +++ b/provider/lpseal/task_sdr.go @@ -146,7 +146,7 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo // Trees; After one retry, it should return the sector to the // SDR stage; max number of retries should be configurable - err = s.sc.GenerateSDR(ctx, sref, ticket, commd) + err = s.sc.GenerateSDR(ctx, taskID, sref, ticket, commd) if err != nil { return false, xerrors.Errorf("generating sdr: %w", err) } @@ -194,13 +194,19 @@ func (s *SDRTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEn } func (s *SDRTask) TypeDetails() harmonytask.TaskTypeDetails { + ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size + if isDevnet { + ssize = abi.SectorSize(2 << 20) + } + res := harmonytask.TaskTypeDetails{ Max: s.max, Name: "SDR", Cost: resources.Resources{ // todo offset for prefetch? - Cpu: 4, // todo multicore sdr - Gpu: 0, - Ram: 54 << 30, + Cpu: 4, // todo multicore sdr + Gpu: 0, + Ram: 54 << 30, + Storage: s.sc.Storage(s.taskToSector, storiface.FTCache, storiface.FTNone, ssize, storiface.PathSealing), }, MaxFailures: 2, Follows: nil, @@ -217,4 +223,19 @@ func (s *SDRTask) Adder(taskFunc harmonytask.AddTaskFunc) { s.sp.pollers[pollerSDR].Set(taskFunc) } +func (s *SDRTask) taskToSector(id harmonytask.TaskID) (lpffi.SectorRef, error) { + var refs []lpffi.SectorRef + + err := s.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_sdr = $1`, id) + if err != nil { + return lpffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err) + } + + if len(refs) != 1 { + return lpffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs)) + } + + return refs[0], nil +} + var _ harmonytask.TaskInterface = &SDRTask{}